home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_tarfile.py < prev    next >
Text File  |  2005-10-18  |  13KB  |  436 lines

  1. import sys
  2. import os
  3. import shutil
  4. import tempfile
  5.  
  6. import unittest
  7. import tarfile
  8.  
  9. from test import test_support
  10.  
  11. # Check for our compression modules.
  12. try:
  13.     import gzip
  14.     gzip.GzipFile
  15. except (ImportError, AttributeError):
  16.     gzip = None
  17. try:
  18.     import bz2
  19. except ImportError:
  20.     bz2 = None
  21.  
  22. def path(path):
  23.     return test_support.findfile(path)
  24.  
  25. testtar = path("testtar.tar")
  26. tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
  27. tempname = test_support.TESTFN
  28. membercount = 10
  29.  
  30. def tarname(comp=""):
  31.     if not comp:
  32.         return testtar
  33.     return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp))
  34.  
  35. def dirname():
  36.     if not os.path.exists(tempdir):
  37.         os.mkdir(tempdir)
  38.     return tempdir
  39.  
  40. def tmpname():
  41.     return tempname
  42.  
  43.  
  44. class BaseTest(unittest.TestCase):
  45.     comp = ''
  46.     mode = 'r'
  47.     sep = ':'
  48.  
  49.     def setUp(self):
  50.         mode = self.mode + self.sep + self.comp
  51.         self.tar = tarfile.open(tarname(self.comp), mode)
  52.  
  53.     def tearDown(self):
  54.         self.tar.close()
  55.  
  56. class ReadTest(BaseTest):
  57.  
  58.     def test(self):
  59.         """Test member extraction.
  60.         """
  61.         members = 0
  62.         for tarinfo in self.tar:
  63.             members += 1
  64.             if not tarinfo.isreg():
  65.                 continue
  66.             f = self.tar.extractfile(tarinfo)
  67.             self.assert_(len(f.read()) == tarinfo.size,
  68.                          "size read does not match expected size")
  69.             f.close()
  70.  
  71.         self.assert_(members == membercount,
  72.                      "could not find all members")
  73.  
  74.     def test_sparse(self):
  75.         """Test sparse member extraction.
  76.         """
  77.         if self.sep != "|":
  78.             f1 = self.tar.extractfile("S-SPARSE")
  79.             f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
  80.             self.assert_(f1.read() == f2.read(),
  81.                          "_FileObject failed on sparse file member")
  82.  
  83.     def test_readlines(self):
  84.         """Test readlines() method of _FileObject.
  85.         """
  86.         if self.sep != "|":
  87.             filename = "0-REGTYPE-TEXT"
  88.             self.tar.extract(filename, dirname())
  89.             lines1 = file(os.path.join(dirname(), filename), "rU").readlines()
  90.             lines2 = self.tar.extractfile(filename).readlines()
  91.             self.assert_(lines1 == lines2,
  92.                          "_FileObject.readline() does not work correctly")
  93.  
  94.     def test_seek(self):
  95.         """Test seek() method of _FileObject, incl. random reading.
  96.         """
  97.         if self.sep != "|":
  98.             filename = "0-REGTYPE"
  99.             self.tar.extract(filename, dirname())
  100.             data = file(os.path.join(dirname(), filename), "rb").read()
  101.  
  102.             tarinfo = self.tar.getmember(filename)
  103.             fobj = self.tar.extractfile(tarinfo)
  104.  
  105.             text = fobj.read()
  106.             fobj.seek(0)
  107.             self.assert_(0 == fobj.tell(),
  108.                          "seek() to file's start failed")
  109.             fobj.seek(2048, 0)
  110.             self.assert_(2048 == fobj.tell(),
  111.                          "seek() to absolute position failed")
  112.             fobj.seek(-1024, 1)
  113.             self.assert_(1024 == fobj.tell(),
  114.                          "seek() to negative relative position failed")
  115.             fobj.seek(1024, 1)
  116.             self.assert_(2048 == fobj.tell(),
  117.                          "seek() to positive relative position failed")
  118.             s = fobj.read(10)
  119.             self.assert_(s == data[2048:2058],
  120.                          "read() after seek failed")
  121.             fobj.seek(0, 2)
  122.             self.assert_(tarinfo.size == fobj.tell(),
  123.                          "seek() to file's end failed")
  124.             self.assert_(fobj.read() == "",
  125.                          "read() at file's end did not return empty string")
  126.             fobj.seek(-tarinfo.size, 2)
  127.             self.assert_(0 == fobj.tell(),
  128.                          "relative seek() to file's start failed")
  129.             fobj.seek(512)
  130.             s1 = fobj.readlines()
  131.             fobj.seek(512)
  132.             s2 = fobj.readlines()
  133.             self.assert_(s1 == s2,
  134.                          "readlines() after seek failed")
  135.             fobj.close()
  136.  
  137. class ReadStreamTest(ReadTest):
  138.     sep = "|"
  139.  
  140.     def test(self):
  141.         """Test member extraction, and for StreamError when
  142.            seeking backwards.
  143.         """
  144.         ReadTest.test(self)
  145.         tarinfo = self.tar.getmembers()[0]
  146.         f = self.tar.extractfile(tarinfo)
  147.         self.assertRaises(tarfile.StreamError, f.read)
  148.  
  149.     def test_stream(self):
  150.         """Compare the normal tar and the stream tar.
  151.         """
  152.         stream = self.tar
  153.         tar = tarfile.open(tarname(), 'r')
  154.  
  155.         while 1:
  156.             t1 = tar.next()
  157.             t2 = stream.next()
  158.             if t1 is None:
  159.                 break
  160.             self.assert_(t2 is not None, "stream.next() failed.")
  161.  
  162.             if t2.islnk() or t2.issym():
  163.                 self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
  164.                 continue
  165.             v1 = tar.extractfile(t1)
  166.             v2 = stream.extractfile(t2)
  167.             if v1 is None:
  168.                 continue
  169.             self.assert_(v2 is not None, "stream.extractfile() failed")
  170.             self.assert_(v1.read() == v2.read(), "stream extraction failed")
  171.  
  172.         stream.close()
  173.  
  174. class WriteTest(BaseTest):
  175.     mode = 'w'
  176.  
  177.     def setUp(self):
  178.         mode = self.mode + self.sep + self.comp
  179.         self.src = tarfile.open(tarname(self.comp), 'r')
  180.         self.dstname = tmpname()
  181.         self.dst = tarfile.open(self.dstname, mode)
  182.  
  183.     def tearDown(self):
  184.         self.src.close()
  185.         self.dst.close()
  186.  
  187.     def test_posix(self):
  188.         self.dst.posix = 1
  189.         self._test()
  190.  
  191.     def test_nonposix(self):
  192.         self.dst.posix = 0
  193.         self._test()
  194.  
  195.     def test_small(self):
  196.         self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1"))
  197.         self.dst.close()
  198.         self.assertNotEqual(os.stat(self.dstname).st_size, 0)
  199.  
  200.     def _test(self):
  201.         for tarinfo in self.src:
  202.             if not tarinfo.isreg():
  203.                 continue
  204.             f = self.src.extractfile(tarinfo)
  205.             if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
  206.                 self.assertRaises(ValueError, self.dst.addfile,
  207.                                  tarinfo, f)
  208.             else:
  209.                 self.dst.addfile(tarinfo, f)
  210.  
  211. class WriteSize0Test(BaseTest):
  212.     mode = 'w'
  213.  
  214.     def setUp(self):
  215.         self.tmpdir = dirname()
  216.         self.dstname = tmpname()
  217.         self.dst = tarfile.open(self.dstname, "w")
  218.  
  219.     def tearDown(self):
  220.         self.dst.close()
  221.  
  222.     def test_file(self):
  223.         path = os.path.join(self.tmpdir, "file")
  224.         file(path, "w")
  225.         tarinfo = self.dst.gettarinfo(path)
  226.         self.assertEqual(tarinfo.size, 0)
  227.         file(path, "w").write("aaa")
  228.         tarinfo = self.dst.gettarinfo(path)
  229.         self.assertEqual(tarinfo.size, 3)
  230.  
  231.     def test_directory(self):
  232.         path = os.path.join(self.tmpdir, "directory")
  233.         os.mkdir(path)
  234.         tarinfo = self.dst.gettarinfo(path)
  235.         self.assertEqual(tarinfo.size, 0)
  236.  
  237.     def test_symlink(self):
  238.         if hasattr(os, "symlink"):
  239.             path = os.path.join(self.tmpdir, "symlink")
  240.             os.symlink("link_target", path)
  241.             tarinfo = self.dst.gettarinfo(path)
  242.             self.assertEqual(tarinfo.size, 0)
  243.  
  244.  
  245. class WriteStreamTest(WriteTest):
  246.     sep = '|'
  247.  
  248. class WriteGNULongTest(unittest.TestCase):
  249.     """This testcase checks for correct creation of GNU Longname
  250.        and Longlink extensions.
  251.  
  252.        It creates a tarfile and adds empty members with either
  253.        long names, long linknames or both and compares the size
  254.        of the tarfile with the expected size.
  255.  
  256.        It checks for SF bug #812325 in TarFile._create_gnulong().
  257.  
  258.        While I was writing this testcase, I noticed a second bug
  259.        in the same method:
  260.        Long{names,links} weren't null-terminated which lead to
  261.        bad tarfiles when their length was a multiple of 512. This
  262.        is tested as well.
  263.     """
  264.  
  265.     def setUp(self):
  266.         self.tar = tarfile.open(tmpname(), "w")
  267.         self.tar.posix = False
  268.  
  269.     def tearDown(self):
  270.         self.tar.close()
  271.  
  272.     def _length(self, s):
  273.         blocks, remainder = divmod(len(s) + 1, 512)
  274.         if remainder:
  275.             blocks += 1
  276.         return blocks * 512
  277.  
  278.     def _calc_size(self, name, link=None):
  279.         # initial tar header
  280.         count = 512
  281.  
  282.         if len(name) > tarfile.LENGTH_NAME:
  283.             # gnu longname extended header + longname
  284.             count += 512
  285.             count += self._length(name)
  286.  
  287.         if link is not None and len(link) > tarfile.LENGTH_LINK:
  288.             # gnu longlink extended header + longlink
  289.             count += 512
  290.             count += self._length(link)
  291.  
  292.         return count
  293.  
  294.     def _test(self, name, link=None):
  295.         tarinfo = tarfile.TarInfo(name)
  296.         if link:
  297.             tarinfo.linkname = link
  298.             tarinfo.type = tarfile.LNKTYPE
  299.  
  300.         self.tar.addfile(tarinfo)
  301.  
  302.         v1 = self._calc_size(name, link)
  303.         v2 = self.tar.offset
  304.         self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
  305.  
  306.     def test_longname_1023(self):
  307.         self._test(("longnam/" * 127) + "longnam")
  308.  
  309.     def test_longname_1024(self):
  310.         self._test(("longnam/" * 127) + "longname")
  311.  
  312.     def test_longname_1025(self):
  313.         self._test(("longnam/" * 127) + "longname_")
  314.  
  315.     def test_longlink_1023(self):
  316.         self._test("name", ("longlnk/" * 127) + "longlnk")
  317.  
  318.     def test_longlink_1024(self):
  319.         self._test("name", ("longlnk/" * 127) + "longlink")
  320.  
  321.     def test_longlink_1025(self):
  322.         self._test("name", ("longlnk/" * 127) + "longlink_")
  323.  
  324.     def test_longnamelink_1023(self):
  325.         self._test(("longnam/" * 127) + "longnam",
  326.                    ("longlnk/" * 127) + "longlnk")
  327.  
  328.     def test_longnamelink_1024(self):
  329.         self._test(("longnam/" * 127) + "longname",
  330.                    ("longlnk/" * 127) + "longlink")
  331.  
  332.     def test_longnamelink_1025(self):
  333.         self._test(("longnam/" * 127) + "longname_",
  334.                    ("longlnk/" * 127) + "longlink_")
  335.  
  336. class ExtractHardlinkTest(BaseTest):
  337.  
  338.     def test_hardlink(self):
  339.         """Test hardlink extraction (bug #857297)
  340.         """
  341.         # Prevent errors from being caught
  342.         self.tar.errorlevel = 1
  343.  
  344.         self.tar.extract("0-REGTYPE", dirname())
  345.         try:
  346.             # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE
  347.             self.tar.extract("1-LNKTYPE", dirname())
  348.         except EnvironmentError, e:
  349.             import errno
  350.             if e.errno == errno.ENOENT:
  351.                 self.fail("hardlink not extracted properly")
  352.  
  353.  
  354. # Gzip TestCases
  355. class ReadTestGzip(ReadTest):
  356.     comp = "gz"
  357. class ReadStreamTestGzip(ReadStreamTest):
  358.     comp = "gz"
  359. class WriteTestGzip(WriteTest):
  360.     comp = "gz"
  361. class WriteStreamTestGzip(WriteStreamTest):
  362.     comp = "gz"
  363.  
  364. # Filemode test cases
  365.  
  366. class FileModeTest(unittest.TestCase):
  367.     def test_modes(self):
  368.         self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x')
  369.         self.assertEqual(tarfile.filemode(07111), '---s--s--t')
  370.  
  371.  
  372. if bz2:
  373.     # Bzip2 TestCases
  374.     class ReadTestBzip2(ReadTestGzip):
  375.         comp = "bz2"
  376.     class ReadStreamTestBzip2(ReadStreamTestGzip):
  377.         comp = "bz2"
  378.     class WriteTestBzip2(WriteTest):
  379.         comp = "bz2"
  380.     class WriteStreamTestBzip2(WriteStreamTestGzip):
  381.         comp = "bz2"
  382.  
  383. # If importing gzip failed, discard the Gzip TestCases.
  384. if not gzip:
  385.     del ReadTestGzip
  386.     del ReadStreamTestGzip
  387.     del WriteTestGzip
  388.     del WriteStreamTestGzip
  389.  
  390. def test_main():
  391.     if gzip:
  392.         # create testtar.tar.gz
  393.         gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
  394.     if bz2:
  395.         # create testtar.tar.bz2
  396.         bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
  397.  
  398.     tests = [
  399.         FileModeTest,
  400.         ReadTest,
  401.         ReadStreamTest,
  402.         WriteTest,
  403.         WriteSize0Test,
  404.         WriteStreamTest,
  405.         WriteGNULongTest,
  406.     ]
  407.  
  408.     if hasattr(os, "link"):
  409.         tests.append(ExtractHardlinkTest)
  410.  
  411.     if gzip:
  412.         tests.extend([
  413.             ReadTestGzip, ReadStreamTestGzip,
  414.             WriteTestGzip, WriteStreamTestGzip
  415.         ])
  416.  
  417.     if bz2:
  418.         tests.extend([
  419.             ReadTestBzip2, ReadStreamTestBzip2,
  420.             WriteTestBzip2, WriteStreamTestBzip2
  421.         ])
  422.     try:
  423.         test_support.run_unittest(*tests)
  424.     finally:
  425.         if gzip:
  426.             os.remove(tarname("gz"))
  427.         if bz2:
  428.             os.remove(tarname("bz2"))
  429.         if os.path.exists(dirname()):
  430.             shutil.rmtree(dirname())
  431.         if os.path.exists(tmpname()):
  432.             os.remove(tmpname())
  433.  
  434. if __name__ == "__main__":
  435.     test_main()
  436.